package com.matrix.async.service.impl;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.matrix.async.bean.AsyncTaskBean;
import com.matrix.async.bean.AsyncTaskDataBean;
import com.matrix.async.bean.AsyncTaskPackageBean;
import com.matrix.async.core.InitAsync;
import com.matrix.async.core.Tools;
import com.matrix.async.dao.AsyncTaskDao;
import com.matrix.async.service.AsyncTaskService;
import com.matrix.core.pojo.PaginationVO;
import com.matrix.core.tools.LogUtil;
import com.matrix.core.tools.StringUtils;
import com.matrix.core.tools.UUIDUtil;

@Service("AsyncTaskService")
public class AsyncTaskServiceImpl implements AsyncTaskService {

	private static String ASYNC_WORK_ENV = "prd";
	// 0：待处理
	private static final String INIT = "0";
	// 1：处理中
	private static final String HANDING = "1";
	// 2：执行成功
	private static final String SUCCESS = "2";
	// 3：执行失败
	private static final String FAILED = "3";
	// 4：执行时异常
	private static final String EXCEPTION = "4";

	private static final String SUCCESS_TAKS_TABLE = "async_task_success";
	private static final String ERROR_TAKS_TABLE = "async_task_error";

	private static final String SUCCESS_TAKS__DATA_TABLE = "async_task_data_success";
	private static final String ERROR_TAKS_DATA_TABLE = "async_task_data_error";

	@Autowired
	AsyncTaskDao asyncTaskDao;
	@Autowired
	InitAsync initAsync;

	/**
	 * 创建异步任务
	 * 
	 * @param createBy
	 * @param taskType
	 */
	@Override
	@Transactional(rollbackFor = Exception.class)
	public void createTask(String createBy, String taskType, Map<String, String> dataMap) {
		// 1.构建任务包
		AsyncTaskPackageBean taskPackage = new AsyncTaskPackageBean();
		String packageId = Tools.createUUID();
		String taskId = Tools.createUUID();
		taskPackage.setId(packageId);
		taskPackage.setCreateBy(createBy);
		taskPackage.setCreateTime(new Date());
		// 指定是谁创建的 谁创建谁消费  根据createBy参数判断
		// 若createBy为null表示任意机器都可以消费这个任务 若不为null表示只能由创建机器执行
		if(StringUtils.isBlank(createBy)) {
			taskPackage.setEnvironment(null);
		}else {
			taskPackage.setEnvironment(Tools.getHostName());
		}
		taskPackage.setTaskType(taskType);
		asyncTaskDao.insertTaskPackage(taskPackage);
		// 2.创建任务
		AsyncTaskBean task = new AsyncTaskBean();
		task.setCreateBy(createBy);
		task.setCreateTime(new Date());
		task.setId(taskId);
		task.setTaskType(taskType);
		task.setStatus(INIT);
		task.setPkgId(packageId);
		asyncTaskDao.insertTask(task);
		// 3.创建任务数据
		List<AsyncTaskDataBean> dataList = new ArrayList<>();
		if(Objects.nonNull(dataMap)) {
			Set<String> keySet = dataMap.keySet();
			for (String businessDataKey : keySet) {
				AsyncTaskDataBean data = new AsyncTaskDataBean();
				data.setBusinessDataKey(businessDataKey);
				data.setBusinessDataValue(dataMap.get(businessDataKey));
				data.setTaskId(taskId);
				data.setCreateBy(createBy);
				data.setCreateTime(new Date());
				data.setId(Tools.createUUID());
				dataList.add(data);
			}
			asyncTaskDao.batchInsertTaskData(dataList);
		}

	}

	@Override
	@Transactional(rollbackFor = Exception.class)
	public List<String> getTaskPackageIdList(String taskType, int batchSize) {
		String batchNo=UUIDUtil.getRandomID();
		asyncTaskDao.callPackageIds(taskType,Tools.getHostName(),batchSize,batchNo);
		List<String> packageIdList = asyncTaskDao.slectPackageIdList(batchNo);
		return packageIdList;
	}

	@Override
	public List<String> getTaskIdList(String packageId) {
		return asyncTaskDao.selectTaskByPackageId(packageId);
	}

	@Override
	@Transactional(rollbackFor = Exception.class)
	public Map<String, String> getTaskDataList(String taskId) {
		List<AsyncTaskDataBean> datas = asyncTaskDao.selectTaskDataList(taskId);
		Map<String, String> map = new HashMap<>();
		for (AsyncTaskDataBean asyncTaskDataBean : datas) {
			map.put(asyncTaskDataBean.getBusinessDataKey(), asyncTaskDataBean.getBusinessDataValue());
		}
		return map;
	}

	@Override
	@Transactional(rollbackFor = Exception.class)
	public void backupTaskPackage(String packageId) {
		// 新增历史记录数据
		asyncTaskDao.backupTaskPackage(packageId);
		// 删除任务包
		asyncTaskDao.deleteTaskPackageById(packageId);
	}

	/**
	 * 如果状态为2 把任务详细copy到success表 删除原有task 把原有taskdata移动到dataSuccess 删除原有的data
	 * 如果状态为3 判断重试次数是否大于3
	 * 
	 * 大于移动到错误数据
	 */
	@Override
	@Transactional(rollbackFor = Exception.class)
	public void updateTaskInfo(AsyncTaskBean asyncTaskBean) {

		AsyncTaskBean asyncTask = asyncTaskDao.selectTaskByTaskId(asyncTaskBean.getId());
		if (asyncTask == null || asyncTask.getId() == null) {
			LogUtil.error("无效任务id={}", asyncTaskBean.getId());
			return;
		}
		// 是否超过重试上限制
		boolean isOverRetryTimes = getRetryTimes(asyncTask) >= 3;

		if (FAILED.equals(asyncTaskBean.getStatus()) && !isOverRetryTimes) {
			// 重试次数+1
			asyncTaskBean.setRetryTimes(getRetryTimes(asyncTask) + 1);
		}
		// 1.更新任务数据
		asyncTaskDao.updateTaskInfo(asyncTaskBean);

		// 判断是否已经是错误数据
		boolean isErrorData = (FAILED.equals(asyncTaskBean.getStatus()) && isOverRetryTimes)
				|| (!SUCCESS.equals(asyncTask.getStatus()) && !FAILED.equals(asyncTaskBean.getStatus()));

		if (SUCCESS.equals(asyncTaskBean.getStatus())) {

			backupSuccessTask(asyncTaskBean.getId());
			backupSuccessData(asyncTaskBean.getId());
			asyncTaskDao.deleteTaskById(asyncTaskBean.getId());
			asyncTaskDao.deleteTaskDateByTaskId(asyncTaskBean.getId());

		} else if (isErrorData) {

			backupErrorTask(asyncTaskBean.getId());
			backupErrorData(asyncTaskBean.getId());
			asyncTaskDao.deleteTaskById(asyncTaskBean.getId());
			asyncTaskDao.deleteTaskDateByTaskId(asyncTaskBean.getId());
		}

	}

	private void backupErrorData(String taskId) {
		backupTaskData(taskId, ERROR_TAKS_DATA_TABLE);
	}

	private void backupErrorTask(String taskId) {
		backupTask(taskId, ERROR_TAKS_TABLE);

	}

	private void backupSuccessData(String taskId) {
		backupTaskData(taskId, SUCCESS_TAKS__DATA_TABLE);
	}

	private void backupSuccessTask(String taskId) {
		backupTask(taskId, SUCCESS_TAKS_TABLE);

	}

	private void backupTask(String taskId, String tableName) {
		asyncTaskDao.backupTask(taskId, tableName);
	}

	private void backupTaskData(String taskId, String tableName) {
		asyncTaskDao.backupTaskData(taskId, tableName);
	}

	private int getRetryTimes(AsyncTaskBean asyncTask) {
		return asyncTask.getRetryTimes() == null ? 0 : asyncTask.getRetryTimes();
	}

	@Override
	@Transactional(rollbackFor = Exception.class)
	public void recoverOvertimeTask(String overTime) {
		// 1.查询到符合超时的packageIdList
		List<String> packageIdList = asyncTaskDao.selectOvertimePackageIdList(Tools.getHostName(), overTime);
		if (!packageIdList.isEmpty()) {
			// 2.更新package
			asyncTaskDao.recoverTaskPackage(packageIdList);
			// 3。更新task
			asyncTaskDao.recoverTask(packageIdList);
		}
	}

	@Override
	public List<AsyncTaskBean> findInPage(AsyncTaskBean task, PaginationVO pageVo) {
		List<AsyncTaskBean> taskList = asyncTaskDao.selectInPage(task, pageVo);
		// 获取产品的
		for (AsyncTaskBean asyncTask : taskList) {
			asyncTask.setTaskData(getTaskDataList(asyncTask.getId()));
		}
		return taskList;
	}

	@Override
	public Integer findTotal(AsyncTaskBean task) {
		return asyncTaskDao.selectTotalRecord(task);
	}

}
